package haproxy

import (
	"bufio"
	"encoding/json"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"os/exec"
	"strings"
	"time"
)

var k8sHost string
var k8sPrefix string
var haconfig string
var hareload string

//从10000开始分端口，分3000个
const (
	pordBegin = 10000
	pordCount = 3000
)

type PortToPubport struct {
	Port     int
	Pubport  int
	Protocol string
}

type PordsCreater struct {
	FreePords []int
	//namespace+service+port作为key
	SvcPordsMap map[string]PortToPubport
}

func (pc *PordsCreater) Init() {
	pc.FreePords = make([]int, pordCount)
	pc.SvcPordsMap = make(map[string]PortToPubport)
	file, err := os.Open("pords.json")
	if err != nil {
		for i := 0; i < pordCount; i++ {
			pc.FreePords[i] = pordBegin + i
		}
		return
	}
	defer file.Close()
	data, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatal("pords.json file is error")
	}
	json.Unmarshal(data, pc)
	return
}
func (pc *PordsCreater) Add(svc string, protocal string, port int) error {
	_, ok := pc.SvcPordsMap[svc]
	if ok {
		return fmt.Errorf("服务已被占用")
	}
	protocal = strings.ToLower(protocal)
	for i := 0; i < pordCount; i++ {
		if pc.FreePords[i] != 0 {
			var p PortToPubport
			p.Port = port
			p.Protocol = protocal
			p.Pubport = pc.FreePords[i]
			pc.FreePords[i] = 0
			pc.SvcPordsMap[svc] = p
			return nil
		}
	}

	return fmt.Errorf("没有端口了")
}

func (pc *PordsCreater) Delete(svc string) {
	p, ok := pc.SvcPordsMap[svc]
	if ok {
		log.Println(svc + " 不存在")
		return
	}
	port := p.Pubport
	delete(pc.SvcPordsMap, svc)
	for i := 0; i < pordCount; i++ {
		if pc.FreePords[i] == 0 {
			pc.FreePords[i] = port
			return
		}
	}
}

func (pc *PordsCreater) Flush() {
	file, err := os.Create("pords.json")
	if err != nil {
		log.Println("create pords.jso ", err)
		return
	}
	defer file.Close()
	date, err := json.Marshal(pc)
	if err != nil {
		log.Println("marshal failed", err)
		return
	}
	file.Write(date)
	return
}

func (pc *PordsCreater) WriteCfg(nodes []string, cfg string) {
	head := readHead()
	// fmt.Println(head)
	file, err := os.Create(cfg)
	if nil != err {
		log.Fatal("create haproxy.cfg ", err)
	}
	defer file.Close()

	file.Write([]byte(head))
	for key, v := range pc.SvcPordsMap {
		buf1 := fmt.Sprintf(`listen %s
  bind 0.0.0.0:%d
  mode %s
  option tcplog
  balance leastconn`, key, v.Pubport, v.Protocol)
		for index, node := range nodes {
			// fmt.Println(node)
			buf2 := fmt.Sprintf("\n  server %s-%d %s:%d check", key, index+1, node, v.Port)
			buf1 += buf2
		}
		buf1 += "\n\n"
		file.Write([]byte(buf1))
	}

	return
}

func (pc *PordsCreater) Clear(svcs map[string]bool) {
	fmt.Println(svcs)

	for k, _ := range pc.SvcPordsMap {
		if svcs[k] == false {
			delete(pc.SvcPordsMap, k)
		}
	}
}

type ServiceMetadata struct {
	Name      string
	Namespace string
}

type ServicePort struct {
	NodePort int
	Protocol string
}

// 如果Type 为 NodePort,这就是需要引出去的服务
type ServiceSpec struct {
	Type  string
	Ports []ServicePort
}

type ServiceObject struct {
	Metadata ServiceMetadata
	Spec     ServiceSpec
}

type Service struct {
	Type   string
	Object ServiceObject
}

func init() {
	flag.StringVar(&k8sHost, "host", "http://127.0.0.1:8080", "k8s 主机地址")
	flag.StringVar(&k8sPrefix, "pre", "api/v1", "前缀")
	flag.StringVar(&haconfig, "haconfig", "/etc/haproxy/haproxy.cfg", "haproxy.cfg的默认路径")
	flag.StringVar(&hareload, "hareload", "systemctl reload haproxy", "haproxy reload 命令")
	flag.Parse()
}

func readHead() string {
	file, err := os.Open("head.cfg")
	if err != nil {
		log.Fatal("没有head.cfg文件", err)
	}
	defer file.Close()
	head, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatal("read head.cfg error ", err)
	}
	return string(head)
}

func watchSvc(ch chan<- Service) {
	resp, err := http.Get(k8sHost + "/" + k8sPrefix + "/watch/services")
	if err != nil {
		log.Println("get http", err)
		return
	}
	defer resp.Body.Close()
	bufrd := bufio.NewReader(resp.Body)
	var svc Service
	// log.Println("watchSvc")

	for {
		line, _, err := bufrd.ReadLine()
		if err != nil {
			log.Println("readline error", err)
			return
		}
		json.Unmarshal(line, &svc)
		fmt.Println(svc)
		if svc.Object.Spec.Type == "NodePort" {
			ch <- svc
		}
	}
}

func watchService(ch chan<- Service) {
	for {
		watchSvc(ch)
		time.Sleep(60 * time.Second)
	}
}

type AllServices struct {
	Items []ServiceObject
}

func getAllService() map[string]bool {
	resp, err := http.Get(k8sHost + "/" + k8sPrefix + "/services")
	if err != nil {
		log.Fatal("获取服务失败，是否是正确的apiserver地址", err)
	}
	defer resp.Body.Close()
	buf, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Fatal("获取所有服务失败", err)
	}
	var allSvc AllServices
	json.Unmarshal(buf, &allSvc)
	ret := make(map[string]bool)
	for _, o := range allSvc.Items {
		if o.Spec.Type == "NodePort" {
			tmp := fmt.Sprintf("%s-%s-", o.Metadata.Namespace, o.Metadata.Name)
			for _, port := range o.Spec.Ports {
				svc := fmt.Sprintf("%s%d", tmp, port.NodePort)
				ret[svc] = true
				// fmt.Println(o.Metadata.Namespace, o.Metadata.Name)
			}
		}
	}
	return ret
}

type NodeMetadata struct {
	Name string
}
type NodeObj struct {
	Meatadata NodeMetadata
}

type NodeInfo struct {
	Items []NodeObj
}

func getAllNodes() []string {
	resp, err := http.Get(k8sHost + "/" + k8sPrefix + "nodes")
	if err != nil {
		log.Println("获取nodes失败", err)
		return nil
	}
	defer resp.Body.Close()
	buf, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		log.Println("读取nodes错误")
		return nil
	}
	var nodeInfo NodeInfo
	json.Unmarshal(buf, &nodeInfo)
	ret := make([]string, 1024)
	var index int
	for _, node := range nodeInfo.Items {
		ret[index] = node.Meatadata.Name
		index++
	}
	return ret[:index+1]
}

func watchNodes(ch chan<- []string) {
	var preNodes []string
	for {
		nodes := getAllNodes()
		// fmt.Println(len(nodes))
		if preNodes == nil || len(preNodes) != len(nodes) {
			ch <- nodes
			preNodes = nodes
		} else {
		tag:
			for _, node := range nodes {
				for _, preNodes := range preNodes {
					if node == preNodes {
						continue tag
					}
				}

				//如果有不同的node，退出比较
				ch <- nodes
				preNodes = nodes
				break
			}
		}
		time.Sleep(10 * time.Second)
	}
}

/*
*一个线程负责读取service的变化，放入chan中，另外一个线程负责读取并定时更新服务和保存映射关系
*启动一个线程监控node的变化，每10秒钟获取一次，如果有变化，更新配置文件
 */
func haproxy() {
	var pc PordsCreater
	pc.Init()
	svcs := getAllService()
	pc.Clear(svcs)

	var chNode = make(chan []string)
	go watchNodes(chNode)
	var nodes []string

	var ch = make(chan Service)
	go watchService(ch)

	t := time.NewTimer(time.Second)

	var command string
	var args []string
	tmp := strings.Split(hareload, " ")
	command = tmp[0]
	args = tmp[1:]

	for {
		select {
		case s := <-ch:
			tmp := fmt.Sprintf("%s-%s-", s.Object.Metadata.Namespace, s.Object.Metadata.Name)
			if s.Type == "ADDED" {
				for _, port := range s.Object.Spec.Ports {
					svc := fmt.Sprintf("%s%d", tmp, port.NodePort)
					pc.Add(svc, port.Protocol, port.NodePort)
				}
			} else {
				for _, port := range s.Object.Spec.Ports {
					svc := fmt.Sprintf("%s%d", tmp, port.NodePort)
					pc.Delete(svc)
				}
			}
			t.Reset(5 * time.Second)
			// fmt.Println("...")

		case n := <-chNode:
			nodes = n
			t.Reset(1 * time.Nanosecond)
		case <-t.C:
			log.Println("flush")
			if nodes == nil {
				t.Reset(5 * time.Second)
			} else {
				pc.Flush()
				pc.WriteCfg(nodes, haconfig)
				log.Println(haconfig, hareload, command)
				log.Println(args)
				cmd := exec.Command(command, args...)
				err := cmd.Run()
				if err != nil {
					log.Println("hareload 命令执行失败", err)
				}
			}
		}
	}
}
